# 50. multiprocessing进程管理模块
仔细说来,multiprocessing不是一个模块而是python中一个操作、管理进程的包。 之所以叫multi是取自multiple的多功能的意思,在这个包中几乎包含了和进程有关的所有子模块。
# multiprocessing.process进程模块 - 重点
process模块是一个创建进程的模块,借助这个模块,就可以完成进程的创建。
# process模块使用简单介绍
Process(group,target,name,args,kwargs),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)
#常用
target= ##指定某一个方法或类开启子进程线程并运行,表示调用对象,即子进程要执行的任务
args=(,) ##指定传输给target函数位置的数据,元组形式,需要有逗号,表示调用对象的位置参数元组,args=(1,2,'egon',)
# 其他参数
group #参数未使用,值始终为None
kwargs #表示调用对象的字典,kwargs={'name':'egon','age':18}
name #为子进程的名称
# process模块方法介绍
# 启动进程 - .start()
启动进程,并调用该子进程中的p.run()
from multiprocessing import Process
def so(n):
print(1)
pc = Process(target=so,args=(1,))
if __name__ == '__main__':
pc.start()
执行结果:
1
**注意:**启动进程的时候要加上if name == 'main':,为什么要加呢,因为开启进程,Python解释器需要你要当前执行文件开启,如果判断你是否在当前文件执行开启,还是用自定义模块开启,所以就需要加上if name == 'main':
# 启动进程 - .run()
进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法
from multiprocessing import Process
def so(n):
print(1)
pc = Process(target=so,args=(1,))
if __name__ == '__main__':
pc.run()
执行结果:
1
# 强制终止进程 - .terminate()
强制终止进程,不会进行任何清理操作,如果创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果还保存了一个锁那么也将不会被释放,进而导致死锁
from multiprocessing import Process
import time
def so(n):
time.sleep(1)
print(1)
pc = Process(target=so,args=(1,))
if __name__ == '__main__':
pc.start()
pc.terminate()
使用了time模块的停止方法,让子进程暂时保留,让terminate()去结束这个子进程
# 检查子进程是否存活 - .is_alive()
如果子进程仍然运行,返回True
from multiprocessing import Process
import time
def so(n):
time.sleep(1)
print(1)
pc = Process(target=so,args=(1,))
if __name__ == '__main__':
pc.start()
print(pc.is_alive())
执行结果:
True
1
# 等待子进程执行完毕 - .join()
主进程等待终止(强调:是主进程处于等的状态,而子进程是处于运行的状态)。timeout是可选的超时时间,需要强调的是,.join只能join住start开启的进程,而不能join住run开启的进程
from multiprocessing import Process
import time
def so(n):
time.sleep(1)
print(1)
pc = Process(target=so,args=(1,))
if __name__ == '__main__':
pc.start()
pc.join()
print(2)
执行结果:
1
2
# 主进程作为子进程的守护进程 .daemon()
默认值为False,如果设为True,代表子进程为后台运行的守护进程,当子进程的父进程终止时,子进程也随之终止,并且设定为True后,子进程不能创建自己的新进程,必须在.start()之前设置
from multiprocessing import Process
import time
def so(n):
time.sleep(1)
print(1)
pc = Process(target=so,args=(1,))
if __name__ == '__main__':
pc.daemon = True
pc.start()
# 设置并查看子进程的名称 - .name
from multiprocessing import Process
import time
def so(n):
time.sleep(1)
print(1)
pc = Process(target=so,args=(1,))
if __name__ == '__main__':
pc.start()
pc.name = "name" ## 设置子进程名称
print(pc.name) ## 查看子进程名称
执行结果:
name
1
# 查看子进程的PID - .pid()
from multiprocessing import Process
import time
def so(n):
time.sleep(1)
print(1)
pc = Process(target=so,args=(1,))
if __name__ == '__main__':
pc.start()
print(pc.pid)
执行结果:
17332
1
# 不常用的方法
.exitcode
## 进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
.authkey
##进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)
# 在windows使用process模块的注意事项
在Windows操作系统中由于没有fork(linux操作系统中创建进程的机制),在创建子进程的时候会自动 import 启动它的这个文件,而在 import 的时候又执行了整个文件。因此如果将process()直接写在文件中就会无限递归创建子进程报错。所以必须把创建子进程的部分使用if name ==‘main’ 判断保护起来,import 的时候 ,就不会递归运行了。 使用process模块创建进程
# process模块的使用
在上面的方法介绍中,也知道了process模块的大概使用方法,其他process模块也可以用类继承方式来开启子进程,下面会讲,现在主要讲的是,process模块的一些使用
# 查看主进程跟子进程的PID
因为process模块没有提供查看主进程的方法,所以这里要使用os模块来查看
from multiprocessing import Process
import os
def so(n):
print("os模块的子进程PID:",os.getpid())
print("os模块的子进程的父进程PID:", os.getppid())
pc = Process(target=so,args=(1,))
if __name__ == '__main__':
pc.start()
print("process模块的子进程PID:",pc.pid)
print("os模块的主进程PID:", os.getpid())
执行结果:
process模块的子进程PID: 2344
os模块的主进程PID: 17196
os模块的子进程PID: 2344
os模块的子进程的父进程PID: 17196
# 多个子进程的开启
from multiprocessing import Process
import random
def so(n):
print("这是第%s个子进程"%n)
if __name__ == '__main__':
for i in range(11):
pc = Process(target=so, args=(i,))
pc.start()
print("我想随机获取子进程的数:%s"%random.randint(0,10))
执行结果:
这是第0个子进程
这是第1个子进程
这是第3个子进程
这是第2个子进程
这是第4个子进程
我想随机获取子进程的数:2
这是第5个子进程
这是第7个子进程
这是第6个子进程
这是第9个子进程
这是第8个子进程
这是第10个子进程
这样子,我想随机获取子进程的数,显示就在其他进程显示的前面,那要等所有进程执行完毕,才能接着执行
使用列表表达式
from multiprocessing import Process
import random
def so(n):
print("这是第%s个子进程"%n)
if __name__ == '__main__':
li = []
for i in range(11):
pc = Process(target=so, args=(i,))
pc.start()
li.append(pc)
[pc.join() for i in li]
print("我想随机获取子进程的数:%s"%random.randint(0,10))
执行结果:
这是第0个子进程
这是第1个子进程
这是第3个子进程
这是第2个子进程
这是第4个子进程
这是第7个子进程
这是第5个子进程
这是第6个子进程
这是第8个子进程
这是第9个子进程
这是第10个子进程
我想随机获取子进程的数:3
# 使用继承类方式来开启子进程
from multiprocessing import Process
class so(Process):
def __init__(self,name):
super().__init__()
self.name = name
def run(self):
print("我是%s"%self.name)
if __name__ == '__main__':
pc = so("子进程")
pc.start()
执行结果:
我是子进程
## 继承类方式来使用,默认执行__init__方法后,执行run方法
# 守护进程
会随着父进程的结束而结束。
父进程创建守护进程
- 其一:守护进程会在父进程代码执行结束后就终止
- 其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
注意:进程之间是互相独立的,父进程代码运行结束,守护进程随即终止
from multiprocessing import Process
import time
def func():
print('子进程开始执行')
time.sleep(2)
print('子进程结束执行')
if __name__ == '__main__':
print('主进程开始执行')
p = Process(target=func,)
p.daemon = True# 将p 设置为守护进程,此代码一定要在start之前设置。
p.start()
time.sleep(1)
print('主进程结束执行')
执行结果:
主进程开始执行
子进程开始执行
主进程结束执行
from multiprocessing import Process
import time
def func2():
print('子进程2开始执行')
time.sleep(2)
print('子进程2结束执行')
def func1():
print('子进程1开始执行')
time.sleep(2)
print('子进程1结束执行')
if __name__ == '__main__':
print('主进程开始执行')
p1 = Process(target=func1,)
p2 = Process(target=func2)
p1.daemon = True# 将p1 设置为守护进程,此代码一定要在start之前设置。
p1.start()
p2.start()
time.sleep(1)# 此时p1 p2 和main 都已经开始执行
print('主进程结束执行')# 当主进程打印完这句话,代表主进程结束,守护进程p1肯定随之结束
# 但是p2 不是守护进程,不会结束,所以此时程序(也就是主进程)会等待p2结束之后才结束。
执行结果:
主进程开始执行
子进程1开始执行
子进程2开始执行
主进程结束执行
子进程2结束执行
# socket tcp协议并发实现聊天
## 服务端文件
from multiprocessing import Process
import socket
from socket import SOL_SOCKET,SO_REUSEADDR
import time
SERVER_ADDR = ('127.0.0.1',8080)
sk = socket.socket()
sk.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
sk.bind(SERVER_ADDR)
sk.listen(5)
def func(conn):
while 1:
try:
msg_r = conn.recv(1024).decode('utf-8')
print(msg_r)
time.sleep(3)
if not msg_r:break
conn.send(msg_r.upper().encode('utf-8'))
except:
break
if __name__ == '__main__':
while 1:
conn,addr = sk.accept()
p = Process(target=func,args=(conn,))
p.start()
sk.close()
## 客户端文件
import socket
sk = socket.socket()
sk.connect(('127.0.0.1',8080))
while 1:
msg_s = input('>>>')
if not msg_s:continue
sk.send(msg_s.encode('utf-8'))
print(sk.recv(1024).decode('utf-8'))
sk.close()
# 多进程中其他方法
# 进程的terminate和is_alive方法
from multiprocessing import Process
import random
import time
class MyProcess(Process):
def __init__(self,name):
super(MyProcess, self).__init__()
self.name = name# name是父类Process中的属性,这里相当于给子进程命名
def run(self):
print('%s 正在撩小姐姐'%self.name)
time.sleep(random.randint(1,3))
print('%s 还在撩小姐姐'%self.name)
if __name__ == '__main__':
p = MyProcess('江凡')
p.start()
time.sleep(0.1)
p.terminate()# 将p进程杀死的命令。 将任务提交给操作系统,操作系统什么时候执行不受用户决定
print(p.is_alive())# 判断p进程是否还存在
time.sleep(1) ## 如果不加sleep来控制,那么可能的结果都是True,因为系统可能还没有强制结束子进程
print(p.is_alive())# 判断p进程是否还存在
执行结果:
True
False
# 进程的name和pid属性
from multiprocessing import Process
import random
import time
class MyProcess(Process):
def __init__(self,name):
super(MyProcess, self).__init__()
self.name = name# name是父类Process中的属性,这里相当于给子进程命名
def run(self):
print('%s 正在撩小姐姐'%self.name)
time.sleep(random.randint(1,3))
print('%s 还在撩小姐姐'%self.name)
if __name__ == '__main__':
p = MyProcess('江凡')
p.start()
print(p.name,p.pid)# 打印进程名字,进程id号
执行结果:
江凡 5808
江凡 正在撩小姐姐
江凡 还在撩小姐姐
# multiprocessing.Value共享变量模块 - 了解
这个模块有很多的限制,比如转入的类型限制,目前没有深入研究
# 共享变量类型
对于共享整数或者单个字符,初始化比较简单,参照下图映射关系即可
Type Code | C Type | Python Type |
---|---|---|
'c' | char | character |
'b' | signed char | int |
'B' | unsigned char | int |
'u' | Py_UNICODE | unicode character |
'h' | signed short | int |
'H' | unsigned short | int |
'i' | signed int | int |
'I' | unsigned int | int |
'l' | signed long | int |
'L' | unsigned long | int |
'f' | float | float |
'd' | double | float |
如果共享的是字符串,则在上表是找不到映射关系的,就是没有code可用。所以我们需要使用原始的ctype类型
比如
from multiprocessing import Value
from ctypes import c_char_p
ss = Value(c_char_p, 'ss')
ctype类型可从下表查阅
ctypes type | C type | Python type |
---|---|---|
c_bool | _Bool | bool (1) |
char | char | 1-character string |
c_wchar | wchar_t | 1-character unicode string |
c_byte | char | int/long |
c_ubyte | unsigned char | int/long |
c_short | short | int/long |
c_ushort | unsigned short | int/long |
c_int | int | int/long |
c_uint | unsigned in | int/long |
c_long | long | int/long |
c_ulong | unsigned long | int/long |
c_longlong | __int64 or long long | int/long |
c_ulonglong | unsigned __int64 or unsigned long long | int/long |
c_float | float | float |
c_double | double | float |
c_longdouble | long double | float |
c_char_p | char * (NUL terminated) | string or None |
c_wchar_p | wchar_t * (NUL terminated) | unicode or None |
c_void_p | void * | int/long or None |
上面这些表都是从网上的拷贝的,对于字符串类型的数据共享,目前还暂时没研究
# 共享变量简单使用
def val(so):
so.value += 2
print(so.value)
if __name__ == '__main__':
so = Value("i",100)
pr = Process(target=val,args=(so,))
pr.start()
pr.join()
print(so.value)
执行结果:
102
102
## 共享变量,需要把变量转进子进程里
## 子进程修改了共享变量,在主进程的共享变量也会跟着修改
# multiprocessing.Lock锁模块 - 重点
刚刚的文章,千方百计实现了程序的异步,让多个任务可以同时在几个进程中并发处理,他们之间的运行没有顺序(或者说由操作系统调度决定他们的顺序),一旦开启也不受我们控制。尽管并发编程让我们能更加充分的利用IO资源,但是也给带来了新的问题。
# Lock模块的功能介绍
Lock模块的功能就只有二个,一个是解锁,一个是锁上
from multiprocessing import Lock
lc = Lock() ## 实例化Lock类,不用加参数
lc.acquire() ## 锁上,把以下的资源全部锁上,不让其他进程操作,除非解锁了
lc.release() ## 解锁,把以上的资源释放了,可以让其他进程进行操作
# 锁的用处
当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题,
可以使用加锁的形式确保了程序的顺序执行,但是执行又变成了串行,降低了效率,但是不得不说,它确保了数据的安全性。
让一份数据资源,保证只有一个进程进行操作,这样才不会出现数据混乱的现象
# 实验 - 模拟银行取钱存钱
银行无论是取钱跟存钱,都是要存入数据库中的,如果没有锁的存在,那么同时存钱跟取钱的操作,有可能会让账户的余额数据混乱
## 钱文件
1000
## 程序文件
from multiprocessing import Lock,Process
import time
def draw(i,lc): ## 取钱
for s in range(2):
lc.acquire()
with open("money") as f:
conn = int(f.read())
time.sleep(0.1) ## 如果不加暂时,可能系统还没读出,就开始下面的if判断了
if conn > i :
conn = conn - i
print("成功取出%s元,目前账户还剩下%s元"%(i,conn))
with open("money","w") as f:
f.write(str(conn))
else:
print("账户余额不足%s元,剩下%s元,无法取出"%(i,conn))
lc.release()
def save(i,lc): ## 存钱
for s in range(2):
lc.acquire()
with open("money") as f:
conn = int(f.read())
time.sleep(0.1)
conn = conn + i
print("成功存入%s元,目前账户还共有%s元" % (i, conn))
with open("money", "w") as f:
f.write(str(conn))
lc.release()
if __name__ == '__main__':
lc = Lock()
pr = Process(target=draw,args=(5,lc)) ## 取钱
pr.start()
pr1 = Process(target=save,args=(5,lc)) ## 存钱
pr1.start()
执行结果:
成功取出5元,目前账户还剩下995元
成功存入5元,目前账户还共有1000元
成功取出5元,目前账户还剩下995元
成功存入5元,目前账户还共有1000元
# 实验 - 模拟12306抢票
抢票,如果不用锁的吧,那么同时有多人抢票,那是不是都抢到票了,但是票只有一个,所以要使用锁
## 钱文件
2
## 程序文件
from multiprocessing import Lock,Process
import time
def Shakedown(lc): ## 抢票
lc.acquire()
for i in range(1,4):
with open("ticket") as f:
coon = int(f.read())
if coon > 0:
coon = coon - 1
print("第%s位用户成功购买了前往北京的车票,剩余的车票数为%s张"%(i,coon))
with open("ticket","w") as f:
f.write(str(coon))
else:
print("第%s位用户抢购失败,当前前往北京的车票数为%s"%(i,coon))
lc.release()
def ticket(lc): ## 查票
lc.acquire()
for i in range(1,4):
with open("ticket") as f:
coon = int(f.read())
print("第%s位用户查看了前往北京的车票数为%s张"%(i,coon))
lc.release()
if __name__ == '__main__':
lc = Lock()
pc = Process(target=ticket,args=(lc,))
pc.start()
time.sleep(0.1)
pc1 = Process(target=Shakedown,args=(lc,))
pc1.start()
执行结果:
第1位用户查看了前往北京的车票数为2张
第2位用户查看了前往北京的车票数为2张
第3位用户查看了前往北京的车票数为2张
第1位用户成功购买了前往北京的车票,剩余的车票数为1张
第2位用户成功购买了前往北京的车票,剩余的车票数为0张
第3位用户抢购失败,当前前往北京的车票数为0
# multiprocessing.Semaphore信号量模块-了解
上述讲的Lock,属于互斥锁,也就是一把钥匙配备一把锁,同时只允许锁住某一个数据。而信号量则是多把钥匙配备多把锁,也就是说同时允许锁住多个数据。
信号量同步基于内部计数器,用户初始化一个计数器初值,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1。当计数器为0时,acquire()调用被阻塞。这是迪科斯彻(Dijkstra)信号量概念P()和V()的Python实现。信号量同步机制适用于访问像服务器这样的有限资源。信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念
# Semaphore模块的功能介绍
Semaphore模块的功能就只有二个,一个是解锁,一个是锁上,能解锁的数量是由实例化的时候控制的
from multiprocessing import Semaphore
s = Semaphore(4) ## 实例化一个对象,参数为 n ,能有几个可以解锁的,4为有4个钥匙可以解锁
s.acquire() ## 锁上,拿走一把钥匙
s.release() ## 解锁,释放一把钥匙
# Semaphore模块的简单使用
from multiprocessing import Semaphore
s = Semaphore(3)
s.acquire()
print(123)
s.acquire()
print(456)
s.acquire()
print(789)
s.acquire()
print(111)
执行结果:
123
456
789
## 程序执行到789之后的s.acquire(),就会进入阻塞,等待程序释放钥匙
# 实验 - 模拟发㾿接客系统
from multiprocessing import Process,Semaphore
import time
import random
def func(i,sem):
sem.acquire()
print('第%s个人进入小黑屋,拿了钥匙锁上门' % i)
time.sleep(random.randint(3,5))
print('第%s个人出去小黑屋,还了钥匙打开门' % i)
sem.release()
if __name__ == '__main__':
sem = Semaphore(2)# 初始化了一把锁5把钥匙,也就是说允许5个人同时进入小黑屋
# 之后其他人必须等待,等有人从小黑屋出来,还了钥匙,才能允许后边的人进入
for i in range(3):
p = Process(target=func,args=(i,sem,))
p.start()
执行结果:
第1个人进入小黑屋,拿了钥匙锁上门
第0个人进入小黑屋,拿了钥匙锁上门
第1个人出去小黑屋,还了钥匙打开门
第2个人进入小黑屋,拿了钥匙锁上门
第0个人出去小黑屋,还了钥匙打开门
第2个人出去小黑屋,还了钥匙打开门
# multiprocessing.Event事件模块 - 了解
python中的事件机制,主要用于主进程控制其他进程的执行
# Event模块的功能介绍
事件主要提供了三个方法 set、wait、clear
事件处理的机制:全局定义了一个“Flag”(event.is_set()),如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
e = Event() #实例化一个对象
e.set() #将is_set()设为True
e.clear() # 将is_set()设为False
e.wait() #判断is_set的bool值,如果bool为True,则非阻塞,bool值为False,则阻塞
e.is_set() # 标识,查看bool值
# 事件是通过is_set()的bool值,去标识e.wait() 的阻塞状态
# 当is_set()的bool值为False时,e.wait()是阻塞状态
# 当is_set()的bool值为True时,e.wait()是非阻塞状态
# 当使用set()时,是把is_set的bool变为True
# 当使用clear()时,是把is_set的bool变为False
# Event模块的简单使用
from multiprocessing import Event
print(e.is_set())# False wait应该是阻塞住
e.set()# 将is_set 的bool值变为True,将wait变为非阻塞
e.wait()
print(e.is_set())
执行结果:
False
True
#################################################################################
from multiprocessing import Event
print(e.is_set())# False wait应该是阻塞住
e.set()# 将is_set 的bool值变为True,将wait变为非阻塞
e.wait()
print(e.is_set())
print(123)
e.clear()
print(e.is_set())
e.wait()
print(123)
执行结果:
False
True
123
False
## 程序进行阻塞等待
# 实验 - 车行信号灯模拟
from multiprocessing import Event,Process
import time
import random
def tl(tlso,e): ## 红绿灯控制
while 1:
if not e.is_set(): ## 如is_set()为False 则执行下面的码
print("绿灯亮")
e.set() ## 把is_set()更改为True,为非阻塞状态
time.sleep(4) ## 暂停4秒
else: #### 如is_set()为True 则执行下面的代码
print("红灯亮")
e.clear() ## 把is_set()更改为False,为阻塞状态
time.sleep(4) ## 暂停4秒
def car(i,e): ## 车辆控制
e.wait() ## 判断当前是否是阻塞,如果是阻塞就等待,如果不是就执行以下代码
print("第%s辆车开过路口"%i)
if __name__ == '__main__':
e = Event()
pr1 = Process(target=tl,args=(tlso,e))
pr1.start()
for i in range(1,99):
while 1:
if e.is_set(): ## 如果is_set为True,就结束循环,执行以下代码,如果is_set为False,就无限循环,直is_set为True,才执行以下代码
break
time.sleep(random.randint(1,2))
pr = Process(target=car,args=(i,e))
pr.start()
# multiprocess.Queue队列模块 - 重点
队列:创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递
队列是安全的,他自带限制,不能出现多进程同时取同一个数据,不能产生数据混乱的现象
# multiprocess.Queue模块的功能介绍
from multiprocessing import Queue
q = Queue(n) ## 实例化一个对象,n:代表队列的大小,能存储几个值
q.get() ## 接收数据,从队列获取值,阻塞等待获取数据,如果有数据直接获取,如果没有数据,阻塞等待
q.get_nowait() # 不接收数据,从队列获取值,阻塞,如果有数据直接获取,没有数据就报错
q.put() ## 存入数据,把数据存入在队列中,阻塞,如果可以继续往队列中放数据,就直接放,不能放就阻塞等待
q.put_nowait() ## 存入数据,把数据存入在队列中,不阻塞,如果可以继续往队列中放数据,就直接放,不能放就报错
q.close() ## 关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果队列 被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
q.qsize() ## 返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。
q.empty() ## 如果调用此方法时 队列为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。
q.full() ## 如果队列已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。。
q.cancel_join_thread() ## 不会再进程退出时自动连接后台线程。这可以防止join_thread()方法阻塞。
q.join_thread() ## 连接队列的后台线程。此方法用于在调用q.close()方法后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread()方法可以禁止这种行为。
# multiprocess.Queue模块的简单使用
from multiprocessing import Queue
q = Queue(3)
q.put(1)
q.put((2,))
q.put([3])
## 在这里我们存储了三个数据,在实例化的时候就限制队列的大小只有三个,如果在存放,肯定会报错,我们先来个异常处理
try:
q.put_nowait(4) ## 这是不能用put,要用put_nowait,因为put是阻塞等待,如果使用put就会一直等待队列有位置了,put_nowait,就是不阻塞,如果不能写入,就报错,所以要用put_nowait
except:
print("队列满了,如果在存储了")
print(q.get())
print(q.get())
print(q.get())
## 在这里我们取了三个数据,在实例化的时候就限制队列的大小只有三个,如果在取,肯定会报错,我们先来个异常处理
try:
q.get_nowait() ## 这是不能用get,要用get_nowait,因为get是阻塞等待,如果使用get就会一直等待队列有数据了,get_nowait,就是不阻塞,如果接收到数据,就报错,所以要用put_nowait
except:
print("队列没数据了")
执行结果:
队列满了,如果在存储了
1
(2,)
[3]
队列没数据了
# 实验 - 多进程批量获取存入
from multiprocessing import Process, Queue,freeze_support
import random
import os
def put_func(q):
info = str(os.getpid()) + '\t:\t' + str(random.randint(0, 100))
q.put(info)
def get_func(q):
print('%s 获取到数据 :\033[33m; %s \033[0m' % (os.getpid(), q.get()))
if __name__ == '__main__':
# freeze_support() 如果有windows系统开启多进程导致程序崩溃,可尝试调用此函数
q = Queue(5)
l_put = []
l_get = []
for i in range(10):
p_put = Process(target=put_func, args=(q,))
p_put.start()
l_put.append(p_put)
for i in range(10):
p_get = Process(target=get_func, args=(q,))
p_get.start()
l_put.append(p_get)
# [i.join() for i in l_put]
# [i.join() for i in l_get]
更多实验将会基于生产者消费者模型的模式来实验,就在下面
# 基于队列实现生产者消费者模型 - 重点
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产进程和消费进程的工作能力来提高程序的整体处理数据的速度。
举个应用栗子:全栈开发时候,前端接收客户请求,后端处理请求逻辑。当某时刻客户请求过于多的时候,后端处理不过来,此时完全可以借助队列来辅助,将客户请求放入队列中,后端逻辑代码处理完一批客户请求后马上从队列中继续获取,这样平衡两端的效率。
为什么要使用生产者和消费者模式
在进程世界里,生产者就是生产数据的进程,消费者就是消费数据的进程。在多进程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
# 实验 - 基于队列的生产者消费者模型
from multiprocessing import Queue,Process
import time
import random
def get_func(q): ## 消费者
while 1:
time.sleep(random.randint(1, 3)) ## 阻塞,随机阻塞时间
info = q.get() ## 获取队列中的值
print('\033[32m厂长拿走了%s \033[0m'%info)
def put_func(q): ## 生产者
for i in range(20):
info = '娃娃%s号'%i
q.put(info) ## 把数据存入队列中
print('\033[31m生产了%s \033[0m' % info)
time.sleep(random.randint(1, 3)) ## 阻塞,随机阻塞时间
if __name__ == '__main__':
q = Queue(5) ## 实例化一个对象,设置队列最大容量为5
p_get = Process(target=get_func, args=(q,)) ## 实例化消费者对象
p_put = Process(target=put_func, args=(q,)) ## 实例化生产者对象
p_get.start() ## 启动消费者
p_put.start() ## 启动生产者
## 基于队列实现的生产者消费者模型,生产者一直在生产娃娃,消费者一直在从队列中获取娃娃,但是消费者因为不知道生产者要生产多少娃娃,也不知道生产者何时就不生产了,所以消费者需要一个死循环一直尝试去从队列中获取娃娃,那么此时问题就出现了,3个进程,主进程开启了两个子进程分别为生产者和消费者,当生产者生产完数据后,生产者结束,消费者一直在尝试接收数据,那么问题就出在了消费者的get方法这里,当get不到数据时候就一直阻塞,那么主进程就一直等待,此时程序就不会结束了
# 实验 - 基于上面实验 - 生产者生产结束标识
解决消费者阻塞接收数据,可以尝试让生产者在生产完数据后,再往队列中放一个结束生产的信号,当消费者接受到信号后,自动的break出死循环即可
from multiprocessing import Queue,Process
import time
import random
def get_func(q): ## 消费者
while 1:
time.sleep(random.randint(1, 3)) ## 阻塞,随机阻塞时间
info = q.get() ## 获取队列中的值
if info == None: ## 当获取到结束生产的标识时,消费者自动break出死循环
break
print('\033[32m厂长拿走了%s \033[0m'%info)
def put_func(q): ##生产者
for i in range(20):
info = '娃娃%s号'%i
q.put(info) ## 向队列存入值
print('\033[31m生产了%s \033[0m' % info)
time.sleep(random.randint(1, 3)) ## 阻塞,随机阻塞时间
q.put(None)# 放入一个结束生产的标识,让消费者可以识别到生产的结束
if __name__ == '__main__':
q = Queue(5) ## 实例化一个对象,设置队列最大容量为5
p_get = Process(target=get_func, args=(q,)) ## 实例化消费者对象
p_put = Process(target=put_func, args=(q,)) ## 实例化生产者对象
p_get.start() ## 启动消费者
p_put.start() ## 启动生产者
# 实验 - 基于上面实验 - 主进程生产标识
经过上一个实验,生产者放入的停止生产的标识,放入标识这件事其实交给主进程来做也可以,但是此时就需要主进程获取到生产者什么时候结束生产。
## 第一个版本
from multiprocessing import Queue,Process
import time
import random
def get_func(q): ## 消费者
while 1:
time.sleep(random.randint(1, 3)) ## 阻塞,随机阻塞时间
info = q.get() ## 获取队列中的值
if info == None: ## 当获取到结束生产的标识时,消费者自动break出死循环
break
print('\033[32m厂长拿走了%s \033[0m'%info)
def put_func(q): ##生产者
for i in range(20):
info = '娃娃%s号'%i
q.put(info) ## 向队列存入值
print('\033[31m生产了%s \033[0m' % info)
time.sleep(random.randint(1, 3)) ## 阻塞,随机阻塞时间
q.put(None)# 放入一个结束生产的标识,让消费者可以识别到生产的结束
if __name__ == '__main__':
q = Queue(5) ## 实例化一个对象,设置队列最大容量为5
p_get = Process(target=get_func, args=(q,)) ## 实例化消费者对象
p_put = Process(target=put_func, args=(q,)) ## 实例化生产者对象
p_get.start() ## 启动消费者
p_put.start() ## 启动生产者
p_put.join() ## 主进程等待生产者进程结束
q.put(None) ## 向队列存入结束标识,因为这一步,只有等生产者进程全部执行完毕后才能执行到
工
## 第二个版本
from multiprocessing import Queue,Process
import time
def consumer(q,name,color):
while 1:
info = q.get()
if info:
print('%s %s 拿走了%s \033[0m'%(color,name,info))
else:# 当消费者获得队列中数据时,如果获得的是None,就是获得到了生产者不再生产数据的标识
break# 此时消费者结束即可
# 消费者如何判断,生产者是没来得及生产数据,还是生产者不再生产数据了?
# 如果你尝试用get_nowait() + try 的方式去尝试获得生产者不再生产数据,此时是有问题的。
def producer(q,product):
for i in range(20):
info = product + '的娃娃%s号'%str(i)
q.put(info)
if __name__ == '__main__':
q = Queue(10)
p_pro1 = Process(target=producer,args=(q,'岛国米饭保你爱'))
p_pro2 = Process(target=producer,args=(q,'苍老师版'))
p_pro3 = Process(target=producer,args=(q,'波多多版'))
p_con1 = Process(target=consumer,args=(q,'alex','\033[31m'))
p_con2 = Process(target=consumer,args=(q,'wusir','\033[32m'))
p_l = [p_con1,p_con2,p_pro1,p_pro2,p_pro3]
[i.start() for i in p_l] ##循环开启进程
# 父进程如何感知到生产者子进程不再生产数据了?
p_pro1.join()
p_pro2.join()
p_pro3.join()
# 有几个消费者就要存入多少个结束标识
q.put(None)
q.put(None)
还有更好的解决方案,使用那个看个人吧,解决方案就在下一个JoinableQueue模块中
# multiprocess.JoinableQueue队列模块 - 重点
JoinableQueue模块是基于Queue队列模块的基础上,而且打开源码看,JoinableQueue模块的类是继承的Queue类,所以说,就算使用JoinableQueue模块,也是可以正常的使用Queue模块的大部分功能
创建可连接的共享队列进程。它就好像一个Queue对象,但是它自带光环,允许消费者通知生产者是不是已经消费完所有的数据了。通知进程是使用共享的信号和条件变量来实现的
# JoinableQueue模块的独特方法
from multiprocessing import JoinableQueue
q = JoinableQueue(n)
## 实例化对象,n:代表队列的最大容量
## 除了与Queue对象相同的方法之外,还具有以下方法:
q.join()
## 阻塞等待,等待消费者消费了所有数据,才会接着执行,用于生产者
## 用于生产者。等待 q.task_done的返回结果,通过返回结果,生产者就能获得消费者当前消费了多少个数据
## 生产者将使用此方法进行阻塞,直到队列中所有项目均被处理。阻塞将持续到消费者为队列中的每个数据均调用q.task_done()方法为止。
q.task_done()
## 每获取一个队列的值,task_done就会给join返回一个标识,说我以消费了一个值,方便join记录
## 用于消费者,是指每消费队列中一个数据,就给join返回一个标识。
## 消费者使用此方法发出信号,表示队列中放入的数据已经被处理。如果调用此方法的次数大于从队列中获取的数据数量,将引发ValueError异常。
# 假设生产者生产了100个数据,join就能记录下100这个数字。每次消费者消费一个数据,就必须要task_done返回一个标识,当生产者(join)接收到100个消费者返回来的标识的时候,生产者就能知道消费者已经把所有数据都消费完了。
# 实验 - 基于JoinableQueue队列的生产者消费者模型
## 单进程
from multiprocessing import JoinableQueue,Process
import time
def b(jq): ##消费者
while 1:
print("消费了%s元"%jq.get())
jq.task_done() ## 每获取一个值,就向join发送一个消费一个值的标识
def a(jq): ##生产者
for i in range(20):
jq.put(i)
jq.join() ## 记录进程存入队列的数量,等待消费者消费,只有完全消费完毕才会结束阻塞
if __name__ == '__main__':
jq = JoinableQueue(5)
p_a = Process(target=a,args=(jq,))
p_b = Process(target=b, args=(jq,))
p_b.daemon = True ## 把消费者设为守护进程,主进程结束,消费者进程也结束
p = [p_a,p_b]
[i.start() for i in p ]
p_a.join() ## 主进程阻塞等待生产者进程结束
# 程序有3个进程,主进程和生产者进程和消费者进程。 当主进程执行到join()时,主进程会等待生产进程结束
# 而生产进程中join()会等待消费者进程把所有数据消费完,生产者进程才结束。
# 现在的状态就是主进程等待生产者进程结束,生产者进程等待消费者消费完所有数据
# 所以把消费者设置为守护进程。当主进程执行完,就代表生产进程已经结束,也就代表消费者进程已经把队列中数据消费完
# 此时,主进程一旦结束,守护进程也就是消费者进程也就跟着结束。整个程序也就能正常结束了。
## 多进程
from multiprocessing import JoinableQueue,Process
import time
def b(jq): ##消费者
while 1:
print("消费了%s元"%jq.get())
jq.task_done() ## 每获取一个值,就向join发送一个消费一个值的标识
def a(jq): ##生产者
for i in range(20):
jq.put(i)
jq.join() ## 记录进程存入队列的数量,等待消费者消费,只有完全消费完毕才会结束阻塞
if __name__ == '__main__':
jq = JoinableQueue(5)
pa = []
pb = []
for s in range(3):
p_a = Process(target=a,args=(jq,))
pa.append(p_a)
p_b = Process(target=b, args=(jq,))
pb.append(p_b)
for i in pb: ## 把消费者设为守护进程,主进程结束,消费者进程也结束
i.daemon = True
[i.start() for i in pa+pb ]
[i.join() for i in pa] ## 主进程阻塞等待生产者进程结束
# multiprocessing.Pipe管道模块 - 了解
管道是不安全的,管道是用于多进程之间通信的一种方式
# Pipe模块的方法介绍
from multiprocessing import Pipe
conn1,conn2 = Pipe()
# 在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道
# 如果在单进程中使用管道,
# 那么就是conn1收数据,就是conn2发数据。
# 如果是conn1发数据,就是conn2收数据
# 如果在多进程中使用管道,那么就必须是父进程使用conn1收,子进程就必须使用conn2发
# 父进程使用conn1发,子进程就必须使用conn2收
# 父进程使用conn2收,子进程就必须使用conn1发
# 父进程使用conn2发,子进程就必须使用conn1收
# 在管道中有一个著名的错误叫做EOFError。是指,父进程中如果关闭了发送端,子进程还继续接收数据,那么就会引发EOFError。
dumplex # 默认管道是全双工的,即conn1和conn2都是既能收数据也能发数据的,如果将duplex设成False,conn1只能用于接收,conn2只能用于发送。
#主要方法:
conn1.recv() # 接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
conn1.send(obj) # 通过连接发送对象。obj是与序列化兼容的任意对象
#其他方法:
conn1.close() #关闭连接。如果conn1被垃圾回收,将自动调用此方法
conn1.fileno() #返回连接使用的整数文件描述符
conn1.poll([timeout]) #如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout设成None,操作将无限期地等待数据到达。
conn1.recv_bytes([maxlength]) #接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。
conn.send_bytes(buffer [, offset [, size]]) #通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收
conn1.recv_bytes_into(buffer [, offset]) #接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。
# Pipe模块的简单使用
from multiprocessing import Process, Pipe
import time
def son(conn2):
time.sleep(1) ## 阻塞暂停1秒
print(conn2.recv()) ## 接收来自conn1发送的数据
conn2.send('我是你爷爷') ## 发送给conn1的数据
if __name__ == '__main__':
conn1, conn2 = Pipe()
p = Process(target=son, args=(conn2,))
p.start()
conn1.send('我是你爸爸!') # 发送给conn2的数据
time.sleep(1)
print(conn1.recv()) # 接收来自conn2的数据
p.join()
# 实例 - 多进程,无限接收数据异常
from multiprocessing import Pipe,Process
def func(con):
con1,con2 = con
con1.close()# 子进程使用con2和父进程通信
while 1:
try:
print(con2.recv())#当主进程的con1发数据时,子进程要死循环的去接收。
except EOFError:# 如果主进程的con1发完数据并关闭con1,子进程的con2继续接收时,就会报错,使用try的方式,获取错误
con2.close()# 获取到错误,就是指子进程已经把管道中所有数据都接收完了,所以用这种方式去关闭管道
break
if __name__ == '__main__':
con1,con2 = Pipe()
p = Process(target=func,args=((con1,con2),))
p.start()
con2.close()# 在父进程中,使用con1去和子进程通信,所以不需要con2,就提前关闭
for i in range(10):# 生产数据
con1.send(i)# 给子进程的con2发送数据
con1.close()# 生产完数据,关闭父进程这一端的管道
## 如果发送端发送完了数据,关闭了通道,那么接收端,还在接收就会报EOFError错误,所以,要做一个异常处理,让程序可以不停止的运行
# 实验 - 基于Pipe管道的生产者消费者模型
from multiprocessing import Pipe, Process
import time
import random
def consumer(conn, man):
conn1, conn2 = conn
conn1.close() ## 关闭当前进程conn1的管道通道
while 1:
try:
info = conn2.recv() ## 接收来自conn1的数据
time.sleep(random.random()) ## 随机暂停
print('\033[31m%s 取走了%s\033[0m' % (man, info))
except EOFError: ## 如果conn1发送端发送了所有的数据,关闭了管道,就会报EOFError错误,所以要做一个异常处理
print('\033[32m没有娃娃了,等下一期吧\033[0m')
conn2.close()
break # 借助肯定产生的EOFError异常来让程序结束
def producer(conn, pro):
conn1, conn2 = conn
conn2.close() ## 关闭当前进程的conn2的管道通道
for i in range(10):
info = '%s 的娃娃%s号' % (pro, i)
conn1.send(info) ## 向conn2发送数据
conn1.close() ## 所有数据发送完毕后,关闭管道
if __name__ == '__main__':
conn1, conn2 = Pipe()
cons1 = Process(target=consumer, args=((conn1, conn2), 'alex')) ## 实例化一个对象
prod1 = Process(target=producer, args=((conn1, conn2), '苍老师版')) ## 实例化一个对象
cons1.start()
prod1.start()
conn1.close() ## 关闭当前进程conn1的管道通道
conn2.close() ## 关闭当前进程conn2的管道通道
# 不写的话,消费者进程结束不了。 一定要记住,多进程中内核会对每个进程的管道进行计数,必须在所有进程中都关闭管道才会引发EOFError异常
# 实验 - 多消费者竞争数据带来数据不安全问题
from multiprocessing import Pipe, Process, Lock
def consumer(conn, man,color,l):
conn1, conn2 = conn
conn1.close()
while 1:
l.acquire()
info = conn2.recv()
l.release()
if info:
print('\033[%s %s 取走了%s\033[0m' % (color,man, info))
else:
conn2.close()
break
def producer(conn, pro):
conn1, conn2 = conn
conn2.close()
for i in range(20):
info = '%s 的娃娃%s号' % (pro, i)
conn1.send(info)
conn1.send(None)
conn1.send(None)
conn1.send(None)
conn1.close()
if __name__ == '__main__':
conn1, conn2 = Pipe()
l = Lock()
cons1 = Process(target=consumer, args=((conn1, conn2), 'alex','31m',l))
cons2 = Process(target=consumer, args=((conn1, conn2), 'Wusir','33m',l))
cons3 = Process(target=consumer, args=((conn1, conn2), '太白','34m',l))
cons4 = Process(target=consumer, args=((conn1, conn2), '彦涛','35m',l))
cons5 = Process(target=consumer, args=((conn1, conn2), 'AAA','36m',l))
cons6 = Process(target=consumer, args=((conn1, conn2), 'BBB','37m',l))
prod1 = Process(target=producer, args=((conn1, conn2), '苍老师版'))
prod2 = Process(target=producer, args=((conn1, conn2), '韩红版'))
l_p = [cons1,
cons2,
cons3,
cons4,
cons5,
cons6,
prod1,
prod2,
]
[i.start() for i in l_p]
conn1.close()
conn2.close()
[i.join() for i in l_p]
# multiprocessing.Manager进程共享内存模块
基于消息传递的并发编程是大势所趋
即便是使用线程,推荐做法也是将程序设计为大量独立的线程集合,通过消息队列交换数据。
这样极大地减少了对使用锁定和其他同步手段的需求,还可以扩展到分布式系统中。
但进程间应该尽量避免通信,即便需要通信,也应该选择进程安全的工具来避免加锁带来的问题。
以后我们会尝试使用数据库来解决现在进程之间的数据共享问题。
进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的
虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此
# Manager模块的方法介绍
from multiprocessing import Process,Manager
m = Manager()
num = m.类型(数据)
## 常用类型
num = m.dict({键 : 值}) ## 以元组类型传输
num = m.list([1,2,3]) ## 以列表类型传输
## 类型有:list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array
# Manager模块的简单使用
from multiprocessing import Process,Manager
def func(num):
num[0] -= 1
print('子进程中的num的值是',num)
if __name__ == '__main__':
m = Manager()
num = m.list([1,2,3])
p = Process(target=func,args=(num,))
p.start()
p.join()
print('父进程中的num的值是',num)
# 实验 - 多进程的变量共享
from multiprocessing import Manager,Process,Lock
def func(dic,lock):
# lock.acquire() # 使用manager模块,多进程共享数据时,如果不加锁,必然会造成数据混乱
dic[0] -= 1
# lock.release()
if __name__ == '__main__':
m = Manager()
lock = Lock()
s = m.list([50])
print(s)
l = []
for i in range(50):
p = Process(target=func,args=(s,lock))
p.start()
l.append(p)
[p.join() for p in l]
print(s[0])
# multiprocessing.Pool进程池模块 - 重点
进程池:
- 一个池子,里边有固定数量的进程。这些进程一直处于待命状态,一旦有任务来,马上就有进程去处理。
- 因为在实际业务中,任务量是有多有少的,如果任务量特别的多,不可能要开对应那么多的进程数
- 开启那么多进程首先就需要消耗大量的时间让操作系统来为你管理它。其次还需要消耗大量时间让,cpu帮你调度它。
- 进程池还会帮程序员去管理池中的进程。
为什么要有进程池?进程池的概念。
在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。那么在成千上万个任务需要被执行的时候,我们就需要去创建成千上万个进程么?首先,创建进程需要消耗时间,销毁进程也需要消耗时间。第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,这样反而会影响程序的效率。因此我们不能无限制的根据任务开启或者结束进程。那么我们要怎么做呢?
在这里,要给大家介绍一个进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿这个池中的进程来处理任务,等到处理完毕,进程并不结束,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果。
# Pool模块的方法介绍
from multiprocessing import Pool
p = Pool(n) ## 实例化一个对象,n:代表进程池中有多少进程,一般是电脑内核+1,可以用os模块的cpu_count(),来计算电脑有多少内核,os.cpu_count() + 1
进程池有三个启动模式:
map(func,iterable) # 可迭代对象模式
func:进程池中的进程执行的任务函数
iterable: 可迭代对象,是把可迭代对象中的每个元素依次传给任务函数当参数,可以使用列表表达式
apply(func,args=()): # 同步模式,同步的效率:也就是说池中的进程一个一个的去执行任务
func:进程池中的进程执行的任务函数
args: 可迭代对象型的参数,是传给任务函数的参数
同步处理任务时,不需要close和join
同步处理任务时,进程池中的所有进程是普通进程(主进程需要等待其执行结束)
apply_async(func,args=(),callback=None): 异步的效率,也就是说池中的进程一次性都去执行任务
func:进程池中的进程执行的任务函数
args: 可迭代对象型的参数,是传给任务函数的参数
异步处理任务时,进程池中的所有进程是守护进程(主进程代码执行完毕守护进程就结束)
异步处理任务时,必须要加上close和join
callback: 回调函数,就是说每当进程池中有进程处理完任务了,返回的结果可以交给回调函数,由回调函数进行进一步的处理,回调函数只有异步才有,同步是没有的
回调函数的使用:
进程的任务函数的返回值,被当成回调函数的形参接收到,以此进行进一步的处理操作
回调函数是由主进程调用的,而不是子进程,子进程只负责把结果传递给回调函数
其他方法:
p.close() # 关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
P.jion() # 等待所有工作进程退出。此方法只能在close()或teminate()之后调用
## 一般这二个同时使用,要先关闭进程池,在等待所有进程退出,后退出
obj.get() ## 返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。
obj.ready() ## 如果调用完成,返回True
obj.successful() ## 如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常
obj.wait([timeout]) ## 等待结果变为可用。
obj.terminate() ## 立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数
# 实验 - 进程池和多进程效率对比
from multiprocessing import Pool,Process
import time
def func(i):
i += 1
if __name__ == '__main__':
p = Pool(4)# 创建进程池,池中有4个进程待命
start = time.time()
p.map(func,[i for i in range(100)]) ## 使用列表表达式迭代值供给对象使用
p.close() ## 关闭进程池,不让外部任务进入进程池
p.join() # 等待进程池中所有进程都执行完所有任务。
print(time.time() - start)# 打印进程池做任务的时间
start = time.time()
l = []
for i in range(100):
p = Process(target=func,args=(i,))
p.start()
l.append(p)
[i.join() for i in l]# 等待所有进程工作结束
print(time.time() - start)# 打印开启100个进程做任务的时间
执行结果:
0.5006439685821533
13.25691270828247
# 实验 - 进程池的同步调用模式
from multiprocessing import Pool
import time
import os
def func(i):
i+= 1
time.sleep(2)
print(i,os.getpid()) ## os.getpid():查看当前进程的PID
return i ## 返回值,返回给调用者
if __name__ == '__main__':
p = Pool(5)# 创建进程池,池中有5个进程待命
res_l = []
for i in range(10):
res = p.apply(func,args=(i,))# 有10个任务来,交给进程池中5个进程,5个进程同步执行任务直到拿到res
res_l.append(res) ## 把获取到的返回值,增加到列表中
print(res_l)
# 实验 - 进程池的异步调用模式
from multiprocessing import Pool
import time,os
def func(i):
i+= 1
time.sleep(2)
print(i,os.getpid()) ## os.getpid():查看当前进程的PID
return i ## 返回值,返回给调用者
if __name__ == '__main__':
p = Pool(5)# 创建进程池,池中有5个进程待命
res_l = []
for i in range(10):
res = p.apply_async(func,args=(i,))# 异步执行10个任务,每次最多5个进程同时运行
res_l.append(res) ## 把获取到的返回值,增加到列表中
p.close()
p.join()# 异步执行需要join,也就是让主进程等待进程池中所有进程执行完所有任务,否则可能进程池中的进程还没来得及执行任务,主进程就结束了。
for i in res_l:
print(i.get()) ## 异步要查看返回值,需要加上.get()方法
# 异步机制,从res的实例obj中get到实际结果,同步机制没有此方法
# 因为同步机制能直接拿到实际结果
# 其实get是阻塞等待的,也就是说,如果没有上边的close和join :
# 主进程一样会阻塞在get等待进程池中给返回结果,进程池异步执行任务获取结果
# 每次有一个进程返回结果后,就能get到一个结果,然后for循环到下一次继续阻塞等待拿结果
# 实验 - 同步跟异步模式效率的对比
from multiprocessing import Pool
import time
def num(i):
i += 10
if __name__ == '__main__':
p = Pool(5)
## 同步模式
start = time.time()
for i in range(100000):
rse = p.apply(num,args=(i,))
print(time.time() - start)
## 异步模式
start = time.time()
for i in range(100000):
rse = p.apply_async(num,args=(i,))
p.close()
p.join()
print(time.time() - start)
执行结果:
28.188576459884644
14.340880870819092
# 实验 - 进程池的异步模式的回调函数
回调函数的使用:
进程的任务函数的返回值,被当成回调函数的形参接收到,以此进行进一步的处理操作
回调函数是由主进程调用的,而不是子进程,子进程只负责把结果传递给回调函数
注意: 回调函数只能在异步模式的时候使用
from multiprocessing import Pool
import os
def num(i):
ia = i + 10
return i,ia ##返回
def dock(num):
i,ia = num
print("第%s波,回调函数输出,回调函数的PID:%s" %(i,os.getpid()))
if __name__ == '__main__':
p = Pool(5)
for i in range(100000):
rse = p.apply_async(num,args=(i,),callback=dock) ## 实例化一个对象,如果有回调函数,默认把进程的返回值,当做参数发送给回调函数指定的函数的参数中,并执行这函数
p.close()
p.join()
print("主进程的PID:",os.getpid())
执行结果:
## 只显示几行结果内容
第99999波,回调函数输出,回调函数的PID:15464
主进程的PID: 15464
## 注意,回调函数不是子进程调用的,是主进程调用的
# 实验 - 进程池实现socket并发聊天
## 服务端文件
from multiprocessing import Pool
import socket
SOURCE_ADDR = ('127.0.0.1', 8090)
def communication(conn):
while 1:
try:
msg_r = conn.recv(1024).decode('utf-8')
if not msg_r: break
print(msg_r)
conn.send(msg_r.upper().encode('utf-8'))
except Exception: # 可能接收比如客户端直接强制断开连接等错误。
print('结束')
break
if __name__ == '__main__':
p = Pool(4)
sk = socket.socket()
sk.bind(SOURCE_ADDR)
sk.listen()
while 1:
conn, addr = sk.accept()
p.apply_async(communication, args=(conn,))
## 客户端文件
import socket
sk = socket.socket()
SOURCE_ADDR = ('127.0.0.1',8090)
sk.connect(SOURCE_ADDR)
while 1:
msg_s = input('>>>')
if not msg_s:continue
sk.send(msg_s.encode('utf-8'))
print(sk.recv(1024).decode('utf-8'))
## 并发开启多个客户端,服务端同一时间只能接收4个客端的请求,当再有客户端请求时,需要等待,只能结束一个客户端,另外一个客户端才会进来